feat: Add support for reading whole text files to read_text#6354
feat: Add support for reading whole text files to read_text#6354plotor wants to merge 1 commit intoEventual-Inc:mainfrom
read_text#6354Conversation
3abc79b to
ecaddb9
Compare
Greptile SummaryThis PR adds a Key observations:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant read_text (Python)
participant TextSourceConfig
participant stream_scan_task (Rust)
participant stream_text (Rust)
participant read_into_whole_text_stream
participant read_into_line_chunk_stream
User->>read_text (Python): read_text(path, whole_text=True/False)
read_text (Python)->>TextSourceConfig: TextSourceConfig(encoding, skip_blank_lines, whole_text, ...)
TextSourceConfig-->>read_text (Python): config
read_text (Python)-->>User: DataFrame (lazy)
User->>stream_scan_task (Rust): collect / execute
stream_scan_task (Rust)->>stream_text (Rust): TextConvertOptions{whole_text, limit, ...}
alt whole_text = true
stream_text (Rust)->>read_into_whole_text_stream: uri, convert_options, ...
read_into_whole_text_stream->>read_into_whole_text_stream: check limit == Some(0) → early return
read_into_whole_text_stream->>read_into_whole_text_stream: read_to_string()
read_into_whole_text_stream->>read_into_whole_text_stream: skip if blank & skip_blank_lines
read_into_whole_text_stream-->>stream_text (Rust): Stream<String> (0 or 1 item per file)
else whole_text = false
stream_text (Rust)->>read_into_line_chunk_stream: uri, convert_options, ...
read_into_line_chunk_stream->>read_into_line_chunk_stream: iterate lines, chunk, respect limit
read_into_line_chunk_stream-->>stream_text (Rust): Stream<Vec<String>>
end
stream_text (Rust)-->>stream_scan_task (Rust): BoxStream<RecordBatch>
stream_scan_task (Rust)-->>User: RecordBatches
|
| Ok(try_stream! { | ||
| let mut content = String::new(); | ||
| reader.read_to_string(&mut content).await?; | ||
|
|
||
| // Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank) | ||
| if convert_options.skip_blank_lines && content.trim().is_empty() { | ||
| return; | ||
| } | ||
|
|
||
| yield content; | ||
| }) |
There was a problem hiding this comment.
Limit pushdown not respected in whole_text mode
convert_options.limit is completely ignored inside read_into_whole_text_stream. In the existing line-oriented path (read_into_line_chunk_stream), the limit is enforced via a remaining counter that short-circuits the loop when it reaches zero. Here, if a scan task is given limit = Some(0) (i.e., the overall query limit is already satisfied by prior scan tasks), this function will still read the entire file and yield one row — producing incorrect results.
A minimal guard at the top of the try_stream! block would address this:
| Ok(try_stream! { | |
| let mut content = String::new(); | |
| reader.read_to_string(&mut content).await?; | |
| // Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank) | |
| if convert_options.skip_blank_lines && content.trim().is_empty() { | |
| return; | |
| } | |
| yield content; | |
| }) | |
| Ok(try_stream! { | |
| // Respect limit pushdown: in whole-text mode each file is exactly one row. | |
| if convert_options.limit == Some(0) { | |
| return; | |
| } | |
| let mut content = String::new(); | |
| reader.read_to_string(&mut content).await?; | |
| // Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank) | |
| if convert_options.skip_blank_lines && content.trim().is_empty() { | |
| return; | |
| } | |
| yield content; | |
| }) |
ecaddb9 to
977676c
Compare
|
This is a supplementary implementation for #6111, adding a |
|
@greptile update summary |
| async fn read_into_whole_text_stream( | ||
| uri: String, | ||
| convert_options: TextConvertOptions, | ||
| read_options: TextReadOptions, | ||
| io_client: Arc<IOClient>, | ||
| io_stats: Option<IOStatsRef>, | ||
| ) -> DaftResult<impl Stream<Item = DaftResult<String>> + Send> { | ||
| let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024); | ||
|
|
||
| let reader: Box<dyn AsyncBufRead + Unpin + Send> = match io_client | ||
| .single_url_get(uri.clone(), None, io_stats) | ||
| .await? | ||
| { | ||
| GetResult::File(file) => Box::new(BufReader::with_capacity( | ||
| buffer_size, | ||
| File::open(file.path).await?, | ||
| )), | ||
| GetResult::Stream(stream, ..) => Box::new(BufReader::with_capacity( | ||
| buffer_size, | ||
| StreamReader::new(stream), | ||
| )), | ||
| }; | ||
|
|
||
| // If file is compressed, wrap stream in decoding stream. | ||
| let mut reader: Box<dyn AsyncBufRead + Unpin + Send> = match CompressionCodec::from_uri(&uri) { | ||
| Some(compression) => Box::new(BufReader::with_capacity( | ||
| buffer_size, | ||
| compression.to_decoder(reader), | ||
| )), | ||
| None => reader, | ||
| }; | ||
|
|
||
| Ok(try_stream! { | ||
| // Check limit first - if limit is 0, don't read the file at all | ||
| if convert_options.limit == Some(0) { | ||
| return; | ||
| } | ||
|
|
||
| let mut content = String::new(); | ||
| reader.read_to_string(&mut content).await?; | ||
|
|
||
| // Apply skip_blank_lines if needed (for whole file, this means skip if entire content is blank) | ||
| if convert_options.skip_blank_lines && content.trim().is_empty() { | ||
| return; | ||
| } | ||
|
|
||
| yield content; | ||
| }) | ||
| } |
There was a problem hiding this comment.
Two separate functions instead of a single parametrized helper
read_into_whole_text_stream duplicates the file-open / compression-wrapping boilerplate that already exists in read_into_line_chunk_stream (the single_url_get match arm, the CompressionCodec wrapping). Keeping them as separate free-standing functions means any future change to how files are opened or decompressed must be made in two places.
Consider extracting the shared setup into a small helper:
async fn open_reader(
uri: &str,
buffer_size: usize,
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<Box<dyn AsyncBufRead + Unpin + Send>> {
let raw: Box<dyn AsyncBufRead + Unpin + Send> = match io_client
.single_url_get(uri.to_string(), None, io_stats)
.await?
{
GetResult::File(file) => Box::new(BufReader::with_capacity(
buffer_size,
File::open(file.path).await?,
)),
GetResult::Stream(stream, ..) => Box::new(BufReader::with_capacity(
buffer_size,
StreamReader::new(stream),
)),
};
Ok(match CompressionCodec::from_uri(uri) {
Some(codec) => Box::new(BufReader::with_capacity(buffer_size, codec.to_decoder(raw))),
None => raw,
})
}Both read_into_whole_text_stream and read_into_line_chunk_stream would then call open_reader, keeping the algorithm-specific logic in each while eliminating the duplication.
Rule Used: Prefer single parametrized functions over multiple... (source)
Learnt From
Eventual-Inc/Daft#5207
| io_client: Arc<IOClient>, | ||
| io_stats: Option<IOStatsRef>, | ||
| ) -> DaftResult<impl Stream<Item = DaftResult<String>> + Send> { | ||
| let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024); |
There was a problem hiding this comment.
Inconsistent default buffer size for local files
In read_into_line_chunk_stream, local files (GetResult::File) use a 256 KiB default buffer:
let buffer_size = read_options.buffer_size.unwrap_or(256 * 1024);Here, read_into_whole_text_stream always falls back to 8 MiB regardless of whether the source is a local file or a remote stream. For small local files this allocates a 32× larger buffer than necessary.
Consider using distinct defaults for local vs. remote sources to stay consistent with the rest of the codebase:
| let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024); | |
| let buffer_size = read_options.buffer_size.unwrap_or(8 * 1024 * 1024); |
(or mirror the GetResult::File / GetResult::Stream split that read_into_line_chunk_stream uses).
desmondcheongzx
left a comment
There was a problem hiding this comment.
Mostly looks good, thanks for the PR @plotor! I left some nits but they're non-blocking
| yield content; | ||
| }) | ||
| } | ||
|
|
There was a problem hiding this comment.
Not a huge deal, but I think greptile is right here!
| // Check limit first - if limit is 0, don't read the file at all | ||
| if convert_options.limit == Some(0) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Looks like we're missing a test for this path
977676c to
384ebad
Compare
Signed-off-by: plotor <zhenchao.wang@hotmail.com>
384ebad to
eb50eb4
Compare
|
@desmondcheongzx Thanks for review, please take a look again~ |
Changes Made
Add a
whole_textoption to theread_textAPI to support reading whole text contents as a single line. Consider scenarios such as inference scenarios where the content of a text might be a complete prompt, in which case it shouldn't be read line by line.Related Issues